home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- ident = '$Id: Utility.py 1405 2007-07-10 20:25:44Z boverhof $'
- import sys
- import types
- import httplib
- import urllib
- import socket
- import weakref
- from os.path import isfile
- from string import join, strip, split
- from UserDict import UserDict
- from cStringIO import StringIO
- from TimeoutSocket import TimeoutSocket, TimeoutError
- from urlparse import urlparse
- from httplib import HTTPConnection, HTTPSConnection
- from exceptions import Exception
-
- try:
- from ZSI import _get_idstr
- except:
-
- def _get_idstr(pyobj):
- x = id(pyobj)
- if x < 0:
- return 'x%x' % abs(x)
-
- return 'o%x' % x
-
-
- import xml.dom.minidom as xml
- from xml.dom import Node
- import logging
- from c14n import Canonicalize
- from Namespaces import SCHEMA, SOAP, XMLNS, ZSI_SCHEMA_URI
-
- try:
- from xml.dom.ext import SplitQName
- except:
-
- def SplitQName(qname):
- l = qname.split(':')
- if len(l) == 1:
- l.insert(0, None)
- elif len(l) == 2:
- if l[0] == 'xmlns':
- l.reverse()
-
- else:
- return None
- return tuple(l)
-
-
- basejoin = urllib.basejoin
- if sys.version_info[0:2] < (2, 4, 0, 'final', 0)[0:2]:
- token = './'
-
- def basejoin(base, url):
- if url.startswith(token) is True:
- return urllib.basejoin(base, url[2:])
-
- return urllib.basejoin(base, url)
-
-
-
- class NamespaceError(Exception):
- pass
-
-
- class RecursionError(Exception):
- pass
-
-
- class ParseError(Exception):
- pass
-
-
- class DOMException(Exception):
- pass
-
-
- class Base:
-
- def __init__(self, module = __name__):
- self.logger = logging.getLogger('%s-%s(%s)' % (module, self.__class__, _get_idstr(self)))
-
-
-
- class HTTPResponse:
-
- def __init__(self, response):
- self.status = response.status
- self.reason = response.reason
- self.headers = response.msg
- if not response.read():
- pass
- self.body = None
- response.close()
-
-
-
- class TimeoutHTTP(HTTPConnection):
-
- def __init__(self, host, port = None, timeout = 20):
- HTTPConnection.__init__(self, host, port)
- self.timeout = timeout
-
-
- def connect(self):
- self.sock = TimeoutSocket(self.timeout)
- self.sock.connect((self.host, self.port))
-
-
-
- class TimeoutHTTPS(HTTPSConnection):
-
- def __init__(self, host, port = None, timeout = 20, **kwargs):
- HTTPSConnection.__init__(self, str(host), port, **kwargs)
- self.timeout = timeout
-
-
- def connect(self):
- sock = TimeoutSocket(self.timeout)
- sock.connect((self.host, self.port))
- realsock = getattr(sock.sock, '_sock', sock.sock)
- ssl = socket.ssl(realsock, self.key_file, self.cert_file)
- self.sock = httplib.FakeSocket(sock, ssl)
-
-
-
- def urlopen(url, timeout = 20, redirects = None):
- (scheme, host, path, params, query, frag) = urlparse(url)
- if scheme not in ('http', 'https'):
- return urllib.urlopen(url)
-
- if params:
- path = '%s;%s' % (path, params)
-
- if query:
- path = '%s?%s' % (path, query)
-
- if frag:
- path = '%s#%s' % (path, frag)
-
- if scheme == 'https':
-
- try:
- import M2Crypto
- except ImportError:
- if not hasattr(socket, 'ssl'):
- raise RuntimeError, 'no built-in SSL Support'
-
- conn = TimeoutHTTPS(host, None, timeout)
-
- ctx = M2Crypto.SSL.Context()
- ctx.set_session_timeout(timeout)
- conn = M2Crypto.httpslib.HTTPSConnection(host, ssl_context = ctx)
- conn.set_debuglevel(1)
- else:
- conn = TimeoutHTTP(host, None, timeout)
- conn.putrequest('GET', path)
- conn.putheader('Connection', 'close')
- conn.endheaders()
- response = None
- while None:
- response = conn.getresponse()
- if response.status != 100:
- break
-
- conn._HTTPConnection__state = httplib._CS_REQ_SENT
- conn._HTTPConnection__response = None
- continue
- status = response.status
- if status >= 300 and status < 400:
- location = response.msg.getheader('location')
- if location is not None:
- response.close()
- if redirects is not None and redirects.has_key(location):
- raise RecursionError('Circular HTTP redirection detected.')
-
- if redirects is None:
- redirects = { }
-
- redirects[location] = 1
- return urlopen(location, timeout, redirects)
-
- raise HTTPResponse(response)
-
- if not status >= 200 and status < 300:
- raise HTTPResponse(response)
-
- body = StringIO(response.read())
- response.close()
- return body
-
-
- class DOM:
- NS_SOAP_ENV_1_1 = 'http://schemas.xmlsoap.org/soap/envelope/'
- NS_SOAP_ENC_1_1 = 'http://schemas.xmlsoap.org/soap/encoding/'
- NS_SOAP_ENV_1_2 = 'http://www.w3.org/2001/06/soap-envelope'
- NS_SOAP_ENC_1_2 = 'http://www.w3.org/2001/06/soap-encoding'
- NS_SOAP_ENV_ALL = (NS_SOAP_ENV_1_1, NS_SOAP_ENV_1_2)
- NS_SOAP_ENC_ALL = (NS_SOAP_ENC_1_1, NS_SOAP_ENC_1_2)
- NS_SOAP_ENV = NS_SOAP_ENV_1_1
- NS_SOAP_ENC = NS_SOAP_ENC_1_1
- _soap_uri_mapping = {
- NS_SOAP_ENV_1_1: '1.1',
- NS_SOAP_ENV_1_2: '1.2' }
- SOAP_ACTOR_NEXT_1_1 = 'http://schemas.xmlsoap.org/soap/actor/next'
- SOAP_ACTOR_NEXT_1_2 = 'http://www.w3.org/2001/06/soap-envelope/actor/next'
- SOAP_ACTOR_NEXT_ALL = (SOAP_ACTOR_NEXT_1_1, SOAP_ACTOR_NEXT_1_2)
-
- def SOAPUriToVersion(self, uri):
- value = self._soap_uri_mapping.get(uri)
- if value is not None:
- return value
-
- raise ValueError('Unsupported SOAP envelope uri: %s' % uri)
-
-
- def GetSOAPEnvUri(self, version):
- attrname = 'NS_SOAP_ENV_%s' % join(split(version, '.'), '_')
- value = getattr(self, attrname, None)
- if value is not None:
- return value
-
- raise ValueError('Unsupported SOAP version: %s' % version)
-
-
- def GetSOAPEncUri(self, version):
- attrname = 'NS_SOAP_ENC_%s' % join(split(version, '.'), '_')
- value = getattr(self, attrname, None)
- if value is not None:
- return value
-
- raise ValueError('Unsupported SOAP version: %s' % version)
-
-
- def GetSOAPActorNextUri(self, version):
- attrname = 'SOAP_ACTOR_NEXT_%s' % join(split(version, '.'), '_')
- value = getattr(self, attrname, None)
- if value is not None:
- return value
-
- raise ValueError('Unsupported SOAP version: %s' % version)
-
- NS_XSD_99 = 'http://www.w3.org/1999/XMLSchema'
- NS_XSI_99 = 'http://www.w3.org/1999/XMLSchema-instance'
- NS_XSD_00 = 'http://www.w3.org/2000/10/XMLSchema'
- NS_XSI_00 = 'http://www.w3.org/2000/10/XMLSchema-instance'
- NS_XSD_01 = 'http://www.w3.org/2001/XMLSchema'
- NS_XSI_01 = 'http://www.w3.org/2001/XMLSchema-instance'
- NS_XSD_ALL = (NS_XSD_99, NS_XSD_00, NS_XSD_01)
- NS_XSI_ALL = (NS_XSI_99, NS_XSI_00, NS_XSI_01)
- NS_XSD = NS_XSD_01
- NS_XSI = NS_XSI_01
- _xsd_uri_mapping = {
- NS_XSD_99: NS_XSI_99,
- NS_XSD_00: NS_XSI_00,
- NS_XSD_01: NS_XSI_01 }
- for key, value in _xsd_uri_mapping.items():
- _xsd_uri_mapping[value] = key
-
-
- def InstanceUriForSchemaUri(self, uri):
- return self._xsd_uri_mapping.get(uri)
-
-
- def SchemaUriForInstanceUri(self, uri):
- return self._xsd_uri_mapping.get(uri)
-
- NS_WSDL_1_1 = 'http://schemas.xmlsoap.org/wsdl/'
- NS_WSDL_ALL = (NS_WSDL_1_1,)
- NS_WSDL = NS_WSDL_1_1
- NS_SOAP_BINDING_1_1 = 'http://schemas.xmlsoap.org/wsdl/soap/'
- NS_HTTP_BINDING_1_1 = 'http://schemas.xmlsoap.org/wsdl/http/'
- NS_MIME_BINDING_1_1 = 'http://schemas.xmlsoap.org/wsdl/mime/'
- NS_SOAP_BINDING_ALL = (NS_SOAP_BINDING_1_1,)
- NS_HTTP_BINDING_ALL = (NS_HTTP_BINDING_1_1,)
- NS_MIME_BINDING_ALL = (NS_MIME_BINDING_1_1,)
- NS_SOAP_BINDING = NS_SOAP_BINDING_1_1
- NS_HTTP_BINDING = NS_HTTP_BINDING_1_1
- NS_MIME_BINDING = NS_MIME_BINDING_1_1
- NS_SOAP_HTTP_1_1 = 'http://schemas.xmlsoap.org/soap/http'
- NS_SOAP_HTTP_ALL = (NS_SOAP_HTTP_1_1,)
- NS_SOAP_HTTP = NS_SOAP_HTTP_1_1
- _wsdl_uri_mapping = {
- NS_WSDL_1_1: '1.1' }
-
- def WSDLUriToVersion(self, uri):
- value = self._wsdl_uri_mapping.get(uri)
- if value is not None:
- return value
-
- raise ValueError('Unsupported SOAP envelope uri: %s' % uri)
-
-
- def GetWSDLUri(self, version):
- attr = 'NS_WSDL_%s' % join(split(version, '.'), '_')
- value = getattr(self, attr, None)
- if value is not None:
- return value
-
- raise ValueError('Unsupported WSDL version: %s' % version)
-
-
- def GetWSDLSoapBindingUri(self, version):
- attr = 'NS_SOAP_BINDING_%s' % join(split(version, '.'), '_')
- value = getattr(self, attr, None)
- if value is not None:
- return value
-
- raise ValueError('Unsupported WSDL version: %s' % version)
-
-
- def GetWSDLHttpBindingUri(self, version):
- attr = 'NS_HTTP_BINDING_%s' % join(split(version, '.'), '_')
- value = getattr(self, attr, None)
- if value is not None:
- return value
-
- raise ValueError('Unsupported WSDL version: %s' % version)
-
-
- def GetWSDLMimeBindingUri(self, version):
- attr = 'NS_MIME_BINDING_%s' % join(split(version, '.'), '_')
- value = getattr(self, attr, None)
- if value is not None:
- return value
-
- raise ValueError('Unsupported WSDL version: %s' % version)
-
-
- def GetWSDLHttpTransportUri(self, version):
- attr = 'NS_SOAP_HTTP_%s' % join(split(version, '.'), '_')
- value = getattr(self, attr, None)
- if value is not None:
- return value
-
- raise ValueError('Unsupported WSDL version: %s' % version)
-
- NS_XMLNS = 'http://www.w3.org/2000/xmlns/'
-
- def isElement(self, node, name, nsuri = None):
- if node.nodeType != node.ELEMENT_NODE:
- return 0
-
- if not node.localName == name and nsuri is None:
- pass
- return self.nsUriMatch(node.namespaceURI, nsuri)
-
-
- def getElement(self, node, name, nsuri = None, default = join):
- nsmatch = self.nsUriMatch
- ELEMENT_NODE = node.ELEMENT_NODE
- for child in node.childNodes:
- if child.nodeType == ELEMENT_NODE:
- if child.localName == name or name is None:
- pass
- None if nsuri is None or nsmatch(child.namespaceURI, nsuri) else nsmatch(child.namespaceURI, nsuri)
- continue
-
- if default is not join:
- return default
-
- raise KeyError, name
-
-
- def getElementById(self, node, id, default = join):
- attrget = self.getAttr
- ELEMENT_NODE = node.ELEMENT_NODE
- for child in node.childNodes:
- if child.nodeType == ELEMENT_NODE:
- if attrget(child, 'id') == id:
- return child
-
- attrget(child, 'id') == id
-
- if default is not join:
- return default
-
- raise KeyError, name
-
-
- def getMappingById(self, document, depth = None, element = None, mapping = None, level = 1):
- if document is not None:
- element = document.documentElement
- mapping = { }
-
- attr = element._attrs.get('id', None)
- if attr is not None:
- mapping[attr.value] = element
-
- if depth is None or depth > level:
- level = level + 1
- ELEMENT_NODE = element.ELEMENT_NODE
- for child in element.childNodes:
- if child.nodeType == ELEMENT_NODE:
- self.getMappingById(None, depth, child, mapping, level)
- continue
-
-
- return mapping
-
-
- def getElements(self, node, name, nsuri = None):
- nsmatch = self.nsUriMatch
- result = []
- ELEMENT_NODE = node.ELEMENT_NODE
- for child in node.childNodes:
- if child.nodeType == ELEMENT_NODE:
- if child.localName == name or name is None:
- pass
- None if nsuri is None or nsmatch(child.namespaceURI, nsuri) else nsmatch(child.namespaceURI, nsuri)
- continue
-
- return result
-
-
- def hasAttr(self, node, name, nsuri = None):
- if nsuri is None:
- if node.hasAttribute(name):
- return True
-
- return False
-
- return node.hasAttributeNS(nsuri, name)
-
-
- def getAttr(self, node, name, nsuri = None, default = join):
- if nsuri is None:
- result = node._attrs.get(name, None)
- if result is None:
- for item in node._attrsNS.keys():
- if item[1] == name:
- result = node._attrsNS[item]
- break
- continue
-
-
- else:
- result = node._attrsNS.get((nsuri, name), None)
- if result is not None:
- return result.value
-
- if default is not join:
- return default
-
- return ''
-
-
- def getAttrs(self, node):
- attrs = { }
- for k, v in node._attrs.items():
- attrs[k] = v.value
-
- return attrs
-
-
- def getElementText(self, node, preserve_ws = None):
- result = []
- for child in node.childNodes:
- nodetype = child.nodeType
- if nodetype == child.TEXT_NODE or nodetype == child.CDATA_SECTION_NODE:
- result.append(child.nodeValue)
- continue
-
- value = join(result, '')
- if preserve_ws is None:
- value = strip(value)
-
- return value
-
-
- def findNamespaceURI(self, prefix, node):
- attrkey = (self.NS_XMLNS, prefix)
- DOCUMENT_NODE = node.DOCUMENT_NODE
- ELEMENT_NODE = node.ELEMENT_NODE
- while node is None:
- raise DOMException('Value for prefix %s not found.' % prefix)
- if node.nodeType != ELEMENT_NODE:
- node = node.parentNode
- continue
-
- result = node._attrsNS.get(attrkey, None)
- if result is not None:
- return result.value
-
- if hasattr(node, '__imported__'):
- raise DOMException('Value for prefix %s not found.' % prefix)
-
- node = node.parentNode
- if node.nodeType == DOCUMENT_NODE:
- raise DOMException('Value for prefix %s not found.' % prefix)
- continue
- continue
-
-
- def findDefaultNS(self, node):
- attrkey = (self.NS_XMLNS, 'xmlns')
- DOCUMENT_NODE = node.DOCUMENT_NODE
- ELEMENT_NODE = node.ELEMENT_NODE
- while node.nodeType != ELEMENT_NODE:
- node = node.parentNode
- continue
- result = node._attrsNS.get(attrkey, None)
- if result is not None:
- return result.value
-
- if hasattr(node, '__imported__'):
- raise DOMException('Cannot determine default namespace.')
-
- node = node.parentNode
- if node.nodeType == DOCUMENT_NODE:
- raise DOMException('Cannot determine default namespace.')
- continue
- continue
-
-
- def findTargetNS(self, node):
- attrget = self.getAttr
- attrkey = (self.NS_XMLNS, 'xmlns')
- DOCUMENT_NODE = node.DOCUMENT_NODE
- ELEMENT_NODE = node.ELEMENT_NODE
- while node.nodeType != ELEMENT_NODE:
- node = node.parentNode
- continue
- result = attrget(node, 'targetNamespace', default = None)
- if result is not None:
- return result
-
- node = node.parentNode
- if node.nodeType == DOCUMENT_NODE:
- raise DOMException('Cannot determine target namespace.')
- continue
- continue
-
-
- def getTypeRef(self, element):
- typeattr = self.getAttr(element, 'type', default = None)
- if typeattr is None:
- return None
-
- parts = typeattr.split(':', 1)
- if len(parts) == 2:
- nsuri = self.findNamespaceURI(parts[0], element)
- else:
- nsuri = self.findDefaultNS(element)
- return (nsuri, parts[1])
-
-
- def importNode(self, document, node, deep = 0):
- nodetype = node.nodeType
- if nodetype in (node.DOCUMENT_NODE, node.DOCUMENT_TYPE_NODE):
- raise DOMException('Illegal node type for importNode')
-
- if nodetype == node.ENTITY_REFERENCE_NODE:
- deep = 0
-
- clone = node.cloneNode(deep)
- self._setOwnerDoc(document, clone)
- clone.__imported__ = 1
- return clone
-
-
- def _setOwnerDoc(self, document, node):
- node.ownerDocument = document
- for child in node.childNodes:
- self._setOwnerDoc(document, child)
-
-
-
- def nsUriMatch(self, value, wanted, strict = 0, tt = type(())):
- if (value == wanted or type(wanted) is tt) and value in wanted:
- return 1
-
- if not strict and value is not None:
- if not type(wanted) is tt or wanted:
- pass
- wanted = (wanted,)
- if not value[-1:] != '/' or value:
- pass
- value = value[:-1]
- for item in wanted:
- if item == value or item[:-1] == value:
- return 1
- continue
-
-
- return 0
-
-
- def createDocument(self, nsuri, qname, doctype = None):
- impl = xml.dom.minidom.getDOMImplementation()
- return impl.createDocument(nsuri, qname, doctype)
-
-
- def loadDocument(self, data):
- return xml.dom.minidom.parse(data)
-
-
- def loadFromURL(self, url):
- if isfile(url) is True:
- file = open(url, 'r')
- else:
- file = urlopen(url)
-
- try:
- result = self.loadDocument(file)
- except Exception:
- ex = None
- file.close()
- raise ParseError(('Failed to load document %s' % url,) + ex.args)
-
- file.close()
- return result
-
-
- DOM = DOM()
-
- class MessageInterface:
-
- def __init__(self, sw):
- self.sw = None
- if type(sw) != weakref.ReferenceType and sw is not None:
- self.sw = weakref.ref(sw)
- else:
- self.sw = sw
-
-
- def AddCallback(self, func, *arglist):
- self.sw().AddCallback(func, *arglist)
-
-
- def Known(self, obj):
- return self.sw().Known(obj)
-
-
- def Forget(self, obj):
- return self.sw().Forget(obj)
-
-
- def canonicalize(self):
- raise NotImplementedError, ''
-
-
- def createDocument(self, namespaceURI = SOAP.ENV, localName = 'Envelope'):
- raise NotImplementedError, ''
-
-
- def createAppendElement(self, namespaceURI, localName):
- raise NotImplementedError, ''
-
-
- def findNamespaceURI(self, qualifiedName):
- raise NotImplementedError, ''
-
-
- def resolvePrefix(self, prefix):
- raise NotImplementedError, ''
-
-
- def setAttributeNS(self, namespaceURI, localName, value):
- raise NotImplementedError, ''
-
-
- def setAttributeType(self, namespaceURI, localName):
- raise NotImplementedError, ''
-
-
- def setNamespaceAttribute(self, namespaceURI, prefix):
- raise NotImplementedError, ''
-
-
-
- class ElementProxy(Base, MessageInterface):
- _soap_env_prefix = 'SOAP-ENV'
- _soap_enc_prefix = 'SOAP-ENC'
- _zsi_prefix = 'ZSI'
- _xsd_prefix = 'xsd'
- _xsi_prefix = 'xsi'
- _xml_prefix = 'xml'
- _xmlns_prefix = 'xmlns'
- _soap_env_nsuri = SOAP.ENV
- _soap_enc_nsuri = SOAP.ENC
- _zsi_nsuri = ZSI_SCHEMA_URI
- _xsd_nsuri = SCHEMA.XSD3
- _xsi_nsuri = SCHEMA.XSI3
- _xml_nsuri = XMLNS.XML
- _xmlns_nsuri = XMLNS.BASE
- standard_ns = {
- _xml_prefix: _xml_nsuri,
- _xmlns_prefix: _xmlns_nsuri }
- reserved_ns = {
- _soap_env_prefix: _soap_env_nsuri,
- _soap_enc_prefix: _soap_enc_nsuri,
- _zsi_prefix: _zsi_nsuri,
- _xsd_prefix: _xsd_nsuri,
- _xsi_prefix: _xsi_nsuri }
- name = None
- namespaceURI = None
-
- def __init__(self, sw, message = None):
- self._indx = 0
- MessageInterface.__init__(self, sw)
- Base.__init__(self)
- self._dom = DOM
- self.node = None
- if type(message) in (types.StringType, types.UnicodeType):
- self.loadFromString(message)
- elif isinstance(message, ElementProxy):
- self.node = message._getNode()
- else:
- self.node = message
- self.processorNss = self.standard_ns.copy()
- self.processorNss.update(self.reserved_ns)
-
-
- def __str__(self):
- return self.toString()
-
-
- def evaluate(self, expression, processorNss = None):
- XPath = XPath
- import Ft.Xml
- if not processorNss:
- context = XPath.Context.Context(self.node, processorNss = self.processorNss)
- else:
- context = XPath.Context.Context(self.node, processorNss = processorNss)
- nodes = expression.evaluate(context)
- return (map,)((lambda node: ElementProxy(self.sw, node)), nodes)
-
-
- def checkNode(self, namespaceURI = None, localName = None):
- if not namespaceURI:
- pass
- namespaceURI = self.namespaceURI
- if not localName:
- pass
- localName = self.name
- check = False
- if localName and self.node:
- check = self._dom.isElement(self.node, localName, namespaceURI)
-
- if not check:
- raise NamespaceError, 'unexpected node type %s, expecting %s' % (self.node, localName)
-
-
-
- def setNode(self, node = None):
- if node:
- if isinstance(node, ElementProxy):
- self.node = node._getNode()
- else:
- self.node = node
- elif self.node:
- node = self._dom.getElement(self.node, self.name, self.namespaceURI, default = None)
- if not node:
- raise NamespaceError, 'cant find element (%s,%s)' % (self.namespaceURI, self.name)
-
- self.node = node
- else:
- self.createDocument(self.namespaceURI, localName = self.name, doctype = None)
- self.checkNode()
-
-
- def _getNode(self):
- return self.node
-
-
- def _getElements(self):
- return self._dom.getElements(self.node, name = None)
-
-
- def _getOwnerDocument(self):
- if not self.node.ownerDocument:
- pass
- return self.node
-
-
- def _getUniquePrefix(self):
- while None:
- self._indx += 1
- prefix = 'ns%d' % self._indx
-
- try:
- self._dom.findNamespaceURI(prefix, self._getNode())
- continue
- except DOMException:
- self
- ex = self
- break
- continue
-
-
- return prefix
-
-
- def _getPrefix(self, node, nsuri):
-
- try:
- if node and node.nodeType == node.ELEMENT_NODE and nsuri == self._dom.findDefaultNS(node):
- return None
- except DOMException:
- ex = None
-
- if nsuri == XMLNS.XML:
- return self._xml_prefix
-
- if node.nodeType == Node.ELEMENT_NODE:
- for attr in node.attributes.values():
- if attr.namespaceURI == XMLNS.BASE and nsuri == attr.value:
- return attr.localName
- continue
- elif node.parentNode:
- return self._getPrefix(node.parentNode, nsuri)
-
-
- raise NamespaceError, 'namespaceURI "%s" is not defined' % nsuri
-
-
- def _appendChild(self, node):
- if node is None:
- raise TypeError, 'node is None'
-
- self.node.appendChild(node)
-
-
- def _insertBefore(self, newChild, refChild):
- self.node.insertBefore(newChild, refChild)
-
-
- def _setAttributeNS(self, namespaceURI, qualifiedName, value):
- self.node.setAttributeNS(namespaceURI, qualifiedName, value)
-
-
- def isFault(self):
- return False
-
-
- def getPrefix(self, namespaceURI):
-
- try:
- prefix = self._getPrefix(node = self.node, nsuri = namespaceURI)
- except NamespaceError:
- ex = None
- prefix = self._getUniquePrefix()
- self.setNamespaceAttribute(prefix, namespaceURI)
-
- return prefix
-
-
- def getDocument(self):
- return self._getOwnerDocument()
-
-
- def setDocument(self, document):
- self.node = document
-
-
- def importFromString(self, xmlString):
- doc = self._dom.loadDocument(StringIO(xmlString))
- node = self._dom.getElement(doc, name = None)
- clone = self.importNode(node)
- self._appendChild(clone)
-
-
- def importNode(self, node):
- if isinstance(node, ElementProxy):
- node = node._getNode()
-
- return self._dom.importNode(self._getOwnerDocument(), node, deep = 1)
-
-
- def loadFromString(self, data):
- self.node = self._dom.loadDocument(StringIO(data))
-
-
- def canonicalize(self):
- return Canonicalize(self.node)
-
-
- def toString(self):
- return self.canonicalize()
-
-
- def createDocument(self, namespaceURI, localName, doctype = None):
- prefix = self._soap_env_prefix
- if namespaceURI == self.reserved_ns[prefix]:
- qualifiedName = '%s:%s' % (prefix, localName)
- elif localName is localName:
- pass
- elif localName is None:
- self.node = self._dom.createDocument(None, None, None)
- return None
- else:
- raise KeyError, 'only support creation of document in %s' % self.reserved_ns[prefix]
- document = self._dom.createDocument(nsuri = namespaceURI, qname = qualifiedName, doctype = doctype)
- self.node = document.childNodes[0]
- for prefix, nsuri in self.reserved_ns.items():
- self._setAttributeNS(namespaceURI = self._xmlns_nsuri, qualifiedName = '%s:%s' % (self._xmlns_prefix, prefix), value = nsuri)
-
-
-
- def hasAttribute(self, namespaceURI, localName):
- return self._dom.hasAttr(self._getNode(), name = localName, nsuri = namespaceURI)
-
-
- def setAttributeType(self, namespaceURI, localName):
- self.logger.debug('setAttributeType: (%s,%s)', namespaceURI, localName)
- value = localName
- if namespaceURI:
- value = '%s:%s' % (self.getPrefix(namespaceURI), localName)
-
- xsi_prefix = self.getPrefix(self._xsi_nsuri)
- self._setAttributeNS(self._xsi_nsuri, '%s:type' % xsi_prefix, value)
-
-
- def createAttributeNS(self, namespace, name, value):
- document = self._getOwnerDocument()
- attrNode = document.createAttributeNS(namespace, name, value)
-
-
- def setAttributeNS(self, namespaceURI, localName, value):
- prefix = None
- if namespaceURI:
-
- try:
- prefix = self.getPrefix(namespaceURI)
- except KeyError:
- ex = None
- prefix = 'ns2'
- self.setNamespaceAttribute(prefix, namespaceURI)
- except:
- None<EXCEPTION MATCH>KeyError
-
-
- None<EXCEPTION MATCH>KeyError
- qualifiedName = localName
- if prefix:
- qualifiedName = '%s:%s' % (prefix, localName)
-
- self._setAttributeNS(namespaceURI, qualifiedName, value)
-
-
- def setNamespaceAttribute(self, prefix, namespaceURI):
- self._setAttributeNS(XMLNS.BASE, 'xmlns:%s' % prefix, namespaceURI)
-
-
- def createElementNS(self, namespace, qname):
- document = self._getOwnerDocument()
- node = document.createElementNS(namespace, qname)
- return ElementProxy(self.sw, node)
-
-
- def createAppendSetElement(self, namespaceURI, localName, prefix = None):
- node = self.createAppendElement(namespaceURI, localName, prefix = None)
- node = node._getNode()
- self._setNode(node._getNode())
-
-
- def createAppendElement(self, namespaceURI, localName, prefix = None):
- declare = False
- qualifiedName = localName
- if namespaceURI:
-
- try:
- prefix = self.getPrefix(namespaceURI)
- except:
- declare = True
- if not prefix:
- pass
- prefix = self._getUniquePrefix()
-
- if prefix:
- qualifiedName = '%s:%s' % (prefix, localName)
-
-
- node = self.createElementNS(namespaceURI, qualifiedName)
- if declare:
- node._setAttributeNS(XMLNS.BASE, 'xmlns:%s' % prefix, namespaceURI)
-
- self._appendChild(node = node._getNode())
- return node
-
-
- def createInsertBefore(self, namespaceURI, localName, refChild):
- qualifiedName = localName
- prefix = self.getPrefix(namespaceURI)
- if prefix:
- qualifiedName = '%s:%s' % (prefix, localName)
-
- node = self.createElementNS(namespaceURI, qualifiedName)
- self._insertBefore(newChild = node._getNode(), refChild = refChild._getNode())
- return node
-
-
- def getElement(self, namespaceURI, localName):
- node = self._dom.getElement(self.node, localName, namespaceURI, default = None)
- if node:
- return ElementProxy(self.sw, node)
-
-
-
- def getAttributeValue(self, namespaceURI, localName):
- if self.hasAttribute(namespaceURI, localName):
- attr = self.node.getAttributeNodeNS(namespaceURI, localName)
- return attr.value
-
-
-
- def getValue(self):
- return self._dom.getElementText(self.node, preserve_ws = True)
-
-
- def createAppendTextNode(self, pyobj):
- node = self.createTextNode(pyobj)
- self._appendChild(node = node._getNode())
- return node
-
-
- def createTextNode(self, pyobj):
- document = self._getOwnerDocument()
- node = document.createTextNode(pyobj)
- return ElementProxy(self.sw, node)
-
-
- def findNamespaceURI(self, qualifiedName):
- parts = SplitQName(qualifiedName)
- element = self._getNode()
- if len(parts) == 1:
- return (self._dom.findTargetNS(element), value)
-
- return self._dom.findNamespaceURI(parts[0], element)
-
-
- def resolvePrefix(self, prefix):
- element = self._getNode()
- return self._dom.findNamespaceURI(prefix, element)
-
-
- def getSOAPEnvURI(self):
- return self._soap_env_nsuri
-
-
- def isEmpty(self):
- return not (self.node)
-
-
-
- class Collection(UserDict):
-
- default = lambda self, k: k.name
-
- def __init__(self, parent, key = None):
- UserDict.__init__(self)
- self.parent = weakref.ref(parent)
- self.list = []
- if not key:
- pass
- self._func = self.default
-
-
- def __getitem__(self, key):
- if type(key) is type(1):
- return self.list[key]
-
- return self.data[key]
-
-
- def __setitem__(self, key, item):
- item.parent = weakref.ref(self)
- self.list.append(item)
- self.data[key] = item
-
-
- def keys(self):
- return (map,)((lambda i: self._func(i)), self.list)
-
-
- def items(self):
- return (map,)((lambda i: (self._func(i), i)), self.list)
-
-
- def values(self):
- return self.list
-
-
-
- class CollectionNS(UserDict):
-
- default = lambda self, k: k.name
-
- def __init__(self, parent, key = None):
- UserDict.__init__(self)
- self.parent = weakref.ref(parent)
- self.targetNamespace = None
- self.list = []
- if not key:
- pass
- self._func = self.default
-
-
- def __getitem__(self, key):
- self.targetNamespace = self.parent().targetNamespace
- if type(key) is types.IntType:
- return self.list[key]
- elif self._CollectionNS__isSequence(key):
- (nsuri, name) = key
- return self.data[nsuri][name]
-
- return self.data[self.parent().targetNamespace][key]
-
-
- def __setitem__(self, key, item):
- item.parent = weakref.ref(self)
- self.list.append(item)
- targetNamespace = getattr(item, 'targetNamespace', self.parent().targetNamespace)
- if not self.data.has_key(targetNamespace):
- self.data[targetNamespace] = { }
-
- self.data[targetNamespace][key] = item
-
-
- def __isSequence(self, key):
- if type(key) in (types.TupleType, types.ListType):
- pass
- return len(key) == 2
-
-
- def keys(self):
- keys = []
- for None in self.data.keys():
- tns = None
-
- return keys
-
-
- def items(self):
- return (map,)((lambda i: (self._func(i), i)), self.list)
-
-
- def values(self):
- return self.list
-
-
- from xml.dom.pulldom import PullDOM, START_ELEMENT
-
- def startPrefixMapping(self, prefix, uri):
- if not hasattr(self, '_xmlns_attrs'):
- self._xmlns_attrs = []
-
- if not prefix:
- pass
- self._xmlns_attrs.append(('xmlns', uri))
- self._ns_contexts.append(self._current_context.copy())
- if not prefix:
- pass
- self._current_context[uri] = ''
-
- PullDOM.startPrefixMapping = startPrefixMapping
-
- def startElementNS(self, name, tagName, attrs):
- xmlns_uri = 'http://www.w3.org/2000/xmlns/'
- xmlns_attrs = getattr(self, '_xmlns_attrs', None)
- if xmlns_attrs is not None:
- for aname, value in xmlns_attrs:
- attrs._attrs[(xmlns_uri, aname)] = value
-
- self._xmlns_attrs = []
-
- (uri, localname) = name
- if uri:
- if tagName is None:
- prefix = self._current_context[uri]
- if prefix:
- tagName = prefix + ':' + localname
- else:
- tagName = localname
-
- if self.document:
- node = self.document.createElementNS(uri, tagName)
- else:
- node = self.buildDocument(uri, tagName)
- elif self.document:
- node = self.document.createElement(localname)
- else:
- node = self.buildDocument(None, localname)
- for aname, value in attrs.items():
- (a_uri, a_localname) = aname
- if a_uri == xmlns_uri:
- if a_localname == 'xmlns':
- qname = a_localname
- else:
- qname = 'xmlns:' + a_localname
- attr = self.document.createAttributeNS(a_uri, qname)
- node.setAttributeNodeNS(attr)
- elif a_uri:
- prefix = self._current_context[a_uri]
- if prefix:
- qname = prefix + ':' + a_localname
- else:
- qname = a_localname
- attr = self.document.createAttributeNS(a_uri, qname)
- node.setAttributeNodeNS(attr)
- else:
- attr = self.document.createAttribute(a_localname)
- node.setAttributeNode(attr)
- attr.value = value
-
- self.lastEvent[1] = [
- (START_ELEMENT, node),
- None]
- self.lastEvent = self.lastEvent[1]
- self.push(node)
-
- PullDOM.startElementNS = startElementNS
-
- def _clone_node(node, deep, newOwnerDocument):
- if node.ownerDocument.isSameNode(newOwnerDocument):
- operation = xml.dom.UserDataHandler.NODE_CLONED
- else:
- operation = xml.dom.UserDataHandler.NODE_IMPORTED
- if node.nodeType == xml.dom.minidom.Node.ELEMENT_NODE:
- clone = newOwnerDocument.createElementNS(node.namespaceURI, node.nodeName)
- for attr in node.attributes.values():
- clone.setAttributeNS(attr.namespaceURI, attr.nodeName, attr.value)
- (prefix, tag) = xml.dom.minidom._nssplit(attr.nodeName)
- if prefix == 'xmlns':
- a = clone.getAttributeNodeNS(attr.namespaceURI, tag)
- elif prefix:
- a = clone.getAttributeNodeNS(attr.namespaceURI, tag)
- else:
- a = clone.getAttributeNodeNS(attr.namespaceURI, attr.nodeName)
- a.specified = attr.specified
-
- if deep:
- for child in node.childNodes:
- c = xml.dom.minidom._clone_node(child, deep, newOwnerDocument)
- clone.appendChild(c)
-
-
- elif node.nodeType == xml.dom.minidom.Node.DOCUMENT_FRAGMENT_NODE:
- clone = newOwnerDocument.createDocumentFragment()
- if deep:
- for child in node.childNodes:
- c = xml.dom.minidom._clone_node(child, deep, newOwnerDocument)
- clone.appendChild(c)
-
-
- elif node.nodeType == xml.dom.minidom.Node.TEXT_NODE:
- clone = newOwnerDocument.createTextNode(node.data)
- elif node.nodeType == xml.dom.minidom.Node.CDATA_SECTION_NODE:
- clone = newOwnerDocument.createCDATASection(node.data)
- elif node.nodeType == xml.dom.minidom.Node.PROCESSING_INSTRUCTION_NODE:
- clone = newOwnerDocument.createProcessingInstruction(node.target, node.data)
- elif node.nodeType == xml.dom.minidom.Node.COMMENT_NODE:
- clone = newOwnerDocument.createComment(node.data)
- elif node.nodeType == xml.dom.minidom.Node.ATTRIBUTE_NODE:
- clone = newOwnerDocument.createAttributeNS(node.namespaceURI, node.nodeName)
- clone.specified = True
- clone.value = node.value
- elif node.nodeType == xml.dom.minidom.Node.DOCUMENT_TYPE_NODE:
- operation = xml.dom.UserDataHandler.NODE_IMPORTED
- clone = newOwnerDocument.implementation.createDocumentType(node.name, node.publicId, node.systemId)
- clone.ownerDocument = newOwnerDocument
- if deep:
- clone.entities._seq = []
- clone.notations._seq = []
- for n in node.notations._seq:
- notation = xml.dom.minidom.Notation(n.nodeName, n.publicId, n.systemId)
- notation.ownerDocument = newOwnerDocument
- clone.notations._seq.append(notation)
- if hasattr(n, '_call_user_data_handler'):
- n._call_user_data_handler(operation, n, notation)
- continue
-
- for e in node.entities._seq:
- entity = xml.dom.minidom.Entity(e.nodeName, e.publicId, e.systemId, e.notationName)
- entity.actualEncoding = e.actualEncoding
- entity.encoding = e.encoding
- entity.version = e.version
- entity.ownerDocument = newOwnerDocument
- clone.entities._seq.append(entity)
- if hasattr(e, '_call_user_data_handler'):
- e._call_user_data_handler(operation, n, entity)
- continue
-
-
- else:
- raise xml.dom.NotSupportedErr('Cannot clone node %s' % repr(node))
- if hasattr(node, '_call_user_data_handler'):
- node._call_user_data_handler(operation, node, clone)
-
- return clone
-
- xml.dom.minidom._clone_node = _clone_node
-